{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pandas fast mutate architecture"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(Published 27 Oct 2019)\n",
    "\n",
    "## Problem: series operations are type invariant under grouping\n",
    "\n",
    "### What is type variance?\n",
    "\n",
    "In spirit, most pandas operations are one of two functions.\n",
    "\n",
    "* f_elwise(a, [b]) - takes up to two series, returns a result of the same length.\n",
    "* f_agg(a, [b]) - takes up to two series, returns a result whose length is the number of groupings in the data.\n",
    "\n",
    "Assuming that a SeriesGroupBy was built as a subtype of a Series object,\n",
    "in the [Liskov Substitution sense](https://en.wikipedia.org/wiki/Liskov_substitution_principle),\n",
    "this would mean that..\n",
    "\n",
    "* `f_elwise(SeriesGroupBy, SeriesGroupBy) -> Series`\n",
    "\n",
    "could easily support versions that are...\n",
    "\n",
    "* contravariant on input type - e.g. `f_elwise(Series, ...) -> ...`\n",
    "* covariant on output type - e.g. `f_elwise(..., ...) -> SeriesGroupBy`\n",
    "\n",
    "This would be extremely convenient, since it means that defining a function like `f_add = f_elwise(...)`, would support all operations in the python code below...\n",
    "\n",
    "```python\n",
    "from siuba.data import mtcars\n",
    "\n",
    "g_cyl = mtcars.groupby('cyl')\n",
    "\n",
    "# assume this creates the function f_add\n",
    "f_add = f_elwise('add')\n",
    "\n",
    "mpg2 = f_add(mtcars.mpg, mtcars.mpg)     # -> Series\n",
    "g_cyl_mpg2 = f_add(g_cyl.mpg, g_cyl.mpg) # -> SeriesGroupBy\n",
    "```\n",
    "\n",
    "### What does this look like in pandas?\n",
    "\n",
    "The reality is that pandas SeriesGroupBy objects are not subtypes of a Series.\n",
    "More than that, they do not support addition."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "ename": "TypeError",
     "evalue": "unsupported operand type(s) for +: 'SeriesGroupBy' and 'SeriesGroupBy'",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mTypeError\u001b[0m                                 Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-1-b2b9d78e502f>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      9\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     10\u001b[0m \u001b[0;31m## Both snippets below raise an error.... :/\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 11\u001b[0;31m \u001b[0mg_cyl\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmpg\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mg_cyl\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmpg\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     12\u001b[0m \u001b[0mg_cyl\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mg_cyl\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmpg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mTypeError\u001b[0m: unsupported operand type(s) for +: 'SeriesGroupBy' and 'SeriesGroupBy'"
     ]
    }
   ],
   "source": [
    "%%capture\n",
    "import pandas as pd\n",
    "\n",
    "pd.set_option(\"display.max_rows\", 5)\n",
    "\n",
    "from siuba import _\n",
    "from siuba.data import mtcars\n",
    "\n",
    "g_cyl = mtcars.groupby(\"cyl\")\n",
    "\n",
    "## Both snippets below raise an error.... :/\n",
    "g_cyl.mpg + g_cyl.mpg\n",
    "g_cyl.add(g_cyl.mpg)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## How are grouped operations currently handled in pandas?\n",
    "\n",
    "* f_elwise(a, [b]) - is handled using ungrouped pandas object (e.g. Series), or by using the grouped series `.obj` attribute.\n",
    "* f_agg(a, [b]) - is handled using custom SeriesGroupBy methods\n",
    "\n",
    "This is shown below (note that all results are Series)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "cyl\n",
       "4    26.663636\n",
       "6    19.742857\n",
       "8    15.100000\n",
       "Name: mpg, dtype: float64"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# two ways to do it f_elwise\n",
    "ser_mpg2 = mtcars.mpg + mtcars.mpg\n",
    "ser_mpg2 = g_cyl.mpg.obj + g_cyl.mpg.obj\n",
    "\n",
    "# doing grouped aggregate\n",
    "g_cyl.mpg.mean()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## What about composing f_elwise and f_agg operations?\n",
    "\n",
    "Let's take this in two steps\n",
    "\n",
    "1. composing `f_elwise` operations alone\n",
    "2. composing them with f_agg operations\n",
    "\n",
    "### 1) `f_elwise(a, f_elwise(b, [c]))`\n",
    "\n",
    "In this case, since the result could be a Series, or a SeriesGroupBy,\n",
    "it shouldn't be a problem.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0     43.0\n",
       "1     43.0\n",
       "      ... \n",
       "30    31.0\n",
       "31    43.8\n",
       "Name: mpg, Length: 32, dtype: float64"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "degroup = lambda ser: getattr(ser, \"obj\", ser)\n",
    "f_add = lambda x, y: degroup(x) + degroup(y)\n",
    "\n",
    "f_add(g_cyl.mpg, f_add(g_cyl.mpg, 1))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Also, as noted in the first section, we are returning a Series here, but functions returning a SeriesGroupBy should also be compatible (so long as we enforce liskov substitution..).\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2) `f_elwise(f_agg(a), f_agg(b)) -> same length result`\n",
    "\n",
    "Suppose we wanted to add the mean `mpg` of each group, to each row of `mpg` in the original data.\n",
    "\n",
    "In our system written above, this would look like...\n",
    "\n",
    "```python\n",
    "f_mean = f_agg('mean')\n",
    "f_add = f_elwise('add')\n",
    "\n",
    "res = f_add(g_cyl.mpg, f_mean(g_cyl.mpg))\n",
    "```\n",
    "\n",
    "Remember that for `f_add`, we laid out in the first section that it should allow functions to be substituted in that take a SeriesGroupBy (or parent type) and returns a Series (or subtype)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0     142.028571\n",
       "1     142.028571\n",
       "         ...    \n",
       "30    224.314286\n",
       "31    109.300000\n",
       "Length: 32, dtype: float64"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pandas.core import algorithms\n",
    "\n",
    "\n",
    "def broadcast_agg_result(grouper, result, obj):\n",
    "    # Simplified broadcasting from g_cyl.mpg.transform('mean')\n",
    "    ids, _, ngroup = grouper.group_info\n",
    "    out = algorithms.take_1d(result._values, ids)\n",
    "\n",
    "    return pd.Series(out, index=obj.index, name=obj.name)\n",
    "\n",
    "\n",
    "f_mean = lambda x: broadcast_agg_result(x.grouper, x.mean(), degroup(x))\n",
    "\n",
    "f_add(f_mean(g_cyl.mpg), f_mean(g_cyl.hp))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Notice we can keep going with this, since\n",
    "\n",
    "* f_add(SeriesGroupBy, SeriesGroupby) -> Series\n",
    "* f_mean(SeriesGroupBy, SeriesGroupby) -> Series\n",
    "* we are making SeriesGroupBy a subtype of Series"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0     163.028571\n",
       "1     163.028571\n",
       "         ...    \n",
       "30    239.314286\n",
       "31    130.700000\n",
       "Length: 32, dtype: float64"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "f_add(g_cyl.mpg, f_add(f_mean(g_cyl.mpg), f_mean(g_cyl.hp)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "However, there is are two problems here...\n",
    "\n",
    "1. adding two means, or a number to a mean, shouldn't need to broadcast to the length of the data\n",
    "2. in the code above, f_mean(Series) will return a single value (e.g. 1.2)!\n",
    "\n",
    "The main issue is that a Series is implicitly a single group. To get around this, f_elwise should decide when to broadcast, and all operations should return SeriesGroupBy."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3) `f_elwise(f_agg(a), f_agg(b)) -> agg length result`\n",
    "\n",
    "Above, we had the aggregate return a result the same length as the original data. But this goes against our initial description that f_agg returns a result whose length is the number of groupings.\n",
    "\n",
    "In this case, we need to think more about f_agg's type signature.\n",
    "To do this let's consider a new type, `AggGroupBy`, where...\n",
    "\n",
    "* `AggGroupBy` is a subtype of `SeriesGroupBy`\n",
    "* `AggGroupBy` has 1 row per grouping.\n",
    "* f_agg(a, [b]), with type signature f_agg(`SeriesGroupBy`) -> `AggGroupBy`\n",
    "\n",
    "Finally let's make this drastically simplifying requirement\n",
    "\n",
    "* any operation must take as input either the output of another operation, a literal, or a series using the same grouping.\n",
    "\n",
    "This means that if our operations return grouped Series, then we don't need to worry about the Series case any more. For example, under this system these operations are allowed...\n",
    "\n",
    "* `f_agg(g_cyl.mpg)`\n",
    "* `f_elwise(g_cyl.mpg, 1)`\n",
    "* `f_elwise(f_agg(g_cyl.mpg), g_cyl.mpg)`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pandas.core.groupby import SeriesGroupBy\n",
    "from pandas.core import algorithms\n",
    "\n",
    "\n",
    "# Define Agg Result ----\n",
    "def create_agg_result(ser, orig_object, orig_grouper):\n",
    "    # since pandas groupby method is hard-coded to create a SeriesGroupBy, mock\n",
    "    # AggResult below by making it a SeriesGroupBy whose grouper has 2 extra attributes\n",
    "    obj = ser.groupby(ser.index)\n",
    "    obj.grouper.orig_grouper = orig_grouper\n",
    "    obj.grouper.orig_object = orig_object\n",
    "    return obj\n",
    "\n",
    "\n",
    "def is_agg_result(x):\n",
    "    return hasattr(x, \"grouper\") and hasattr(x.grouper, \"orig_grouper\")\n",
    "\n",
    "\n",
    "# Handling Grouped Operations ----\n",
    "\n",
    "\n",
    "def regroup(ser, grouper):\n",
    "    return ser.groupby(grouper)\n",
    "\n",
    "\n",
    "def degroup(ser):\n",
    "    # returns tuple of (Series or literal, Grouper or None)\n",
    "    # because we can't rely on type checking, use hasattr instead\n",
    "    return getattr(ser, \"obj\", ser), getattr(ser, \"grouper\", None)\n",
    "\n",
    "\n",
    "def f_mean(x):\n",
    "    # SeriesGroupBy -> AggResult\n",
    "    return create_agg_result(x.mean(), x.obj, x.grouper)\n",
    "\n",
    "\n",
    "def broadcast_agg_result(g_ser, compare=None):\n",
    "    \"\"\"Returns a tuple of (Series, final op grouper)\"\"\"\n",
    "    if not isinstance(g_ser, SeriesGroupBy):\n",
    "        return g_ser, compare.grouper\n",
    "    # NOTE: now only applying for agg_result\n",
    "    if not is_agg_result(g_ser):\n",
    "        return degroup(g_ser)\n",
    "\n",
    "    if g_ser.grouper.orig_grouper is compare.grouper:\n",
    "        orig = g_ser.grouper.orig_object\n",
    "        grouper = g_ser.grouper.orig_grouper\n",
    "\n",
    "        # Simplified broadcasting from g_cyl.mpg.transform('mean') implementation\n",
    "        ids, _, ngroup = grouper.group_info\n",
    "        out = algorithms.take_1d(g_ser.obj._values, ids)\n",
    "\n",
    "        return pd.Series(out, index=orig.index, name=orig.name), grouper\n",
    "\n",
    "    return degroup(g_ser)\n",
    "\n",
    "\n",
    "# Define operations ----\n",
    "\n",
    "\n",
    "def f_add(x, y):\n",
    "    # SeriesGroupBy, SeriesGroupBy -> \"\"\n",
    "    broad_x, grouper = broadcast_agg_result(x, y)\n",
    "    broad_y, __ = broadcast_agg_result(y, x)\n",
    "\n",
    "    res = broad_x + broad_y\n",
    "    return regroup(res, grouper)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "cyl\n",
       "4    109.300000\n",
       "6    142.028571\n",
       "8    224.314286\n",
       "dtype: float64"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "grouped_agg = f_add(f_mean(g_cyl.mpg), f_mean(g_cyl.hp))\n",
    "\n",
    "# Notice, only 1 result per group\n",
    "grouped_agg.obj"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0     163.028571\n",
       "1     163.028571\n",
       "         ...    \n",
       "30    239.314286\n",
       "31    130.700000\n",
       "Name: mpg, Length: 32, dtype: float64"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "grouped_mutate = f_add(g_cyl.mpg, grouped_agg)\n",
    "\n",
    "grouped_mutate.obj"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Decisions\n",
    "\n",
    "Functions should essentially follow...\n",
    "\n",
    "* f_elwise(SeriesGroupBy, ...) -> SeriesGroupBy\n",
    "* f_agg(SeriesGroupBy, ...) -> AggGroupBy\n",
    "\n",
    "We can use a final method at the end to validate, depending on if it's a mutate, summarize, or filter.\n",
    "\n",
    "Additionally...\n",
    "\n",
    "* methods on grouped objects can be simply wrapped to keep LSP over functions (since they have some hard-coded constructors)\n",
    "* I need to investigate ops involving getting dim 0 properties (e.g. dtype)\n",
    "* need a strategy for keeping user-defined, custom groupby ops\n",
    "\n",
    "## Limitations\n",
    "\n",
    "* this considers a closed system of function calls, but right now siu expressions can include things like `__getitem__` etc.."
   ]
  }
 ],
 "metadata": {
  "jupytext": {
   "formats": "ipynb,Rmd"
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
